Java NIO - 基础之 Selector、Channel、Buffer

NIO 简介

NIO是什么?

本质上,Java NIO 底层的实现是IO多路复用模型,只不过 Java NIO 组件提供了统一的 API,为大家屏蔽了底层的操作系统的差异。

  • new IO 或 non blocking IO,java 1.4开始引入(JDK1.7+中的NIO统称为NIO.2)可以替代标准的java io api;
  • 与旧IO有着相同的作用和目的,但使用方式完全不同,NIO面向缓冲区、基于通道,文件的读写更加灵活高效。

Java NIO类库包含以下三个核心组件:

  • Channel(通道)
  • Buffer(缓冲区)
  • Selector(选择器)


与传统IO有什么区别?

IO NIO
面向流(I/O Stream Oriented) 面向缓冲区(Buffer Oriented)
阻塞IO(Blocking IO) 非阻塞式IO(Non Blocking IO)
无选择器的概念 有选择器,需要底层操作系统提供支持


啥是通道、缓冲区?

NIO的核心在于通道(channel)和缓冲区(buffer)。通道表示打开到IO设备(例如文件、套接字)的连接,若需要NIO系统,需要获取用于连接IO系统设备的通道以及用于容纳数据的缓冲区,然后操作缓冲区,对数据进行处理。简而言之,channel负责搭建传输通道,buffer负责数据的存取

具体来说,在OIO中,同一个网络连接会关联到两个流:一个是输入流 (Input Stream),另一个是输出流(Output Stream)。Java应用程序通过这两个流不断地进行输入和输出的操作。 在NIO中,一个网络连接使用一个通道表示,所有NIO的IO操作都是通过连接通道完成的。一个通道类似于OIO中两个流的结合体,既可以从通道读取数据,也可以向通道写入数据。

应用程序与通道的交互主要是进行数据的读取和写入。为了完成 NIO的非阻塞读写操作,NIO为我们准备了 Buffer。所谓通道的读取,就是将数据从通道读取到缓冲区中;所谓通道的写入,就是将数据从缓冲区写入通道中。缓冲区的使用是面向流进行读写操作的OIO所没有的,也是NIO非阻塞的重要前提和基础之 一。


啥是选择器?

Java 的 Selector 是一个 API 名称,它在不同的操作系统可能具有不同的具体实现。这里我们以 netty 的实现为例,具体拆解说清楚这个 Selector 的含义。

Netty 通过统一的 Java Selector 接口抽象了底层 I/O 多路复用,Selector 是 Java 层面的抽象,在 Linux 平台下的默认实现是 EpollSelector,底层使用的是 epoll(而非过时的 select)。

三层架构:从抽象到实现:

1
2
3
4
5
6
7
8
9
10
Netty 应用层


Java NIO Selector (java.nio.channels.Selector) ←─ 这是“抽象”


平台特定实现
├── Linux: sun.nio.ch.EPollSelectorImpl ←─ 底层是 epoll
├── macOS: sun.nio.ch.KQueueSelectorImpl ←─ 底层是 kqueue
└── Windows: sun.nio.ch.WindowsSelectorImpl ←─ 底层是 IOCP (Windows 版)

Netty 的 Selector 选择策略:

1
2
3
4
5
6
7
8
9
10
// Netty 的默认 SelectorProvider 选择逻辑
if (isWindows()) {
return new WindowsSelectorProvider(); // Windows: 使用IOCP
} else if (isLinux()) {
// 如果 epoll 可用,则使用 EpollSelectorProvider
if (Epoll.isAvailable()) {
return new EpollSelectorProvider(); // Linux: 使用 epoll
}
}
return SelectorProvider.provider(); // 回退到 JDK 默认(可能是 poll 或 select)

实际上,Netty 不满足于 JDK 的默认实现,还提供了性能更强的原生实现:

1
2
3
4
5
6
// 在 Linux 上,Netty 提供了两个传输选项:
EventLoopGroup group;
// 1. 使用 JDK 的 NIO(底层是 epoll,但有 JDK 的包装开销)
group = new NioEventLoopGroup();
// 2. 使用 Netty 的原生 Epoll(绕过 JDK,直接调用 epoll 系统调用)
group = new EpollEventLoopGroup(); // 直接通过JNI调用 epoll_ctl(), epoll_wait(),性能更好!

你可以通过以下命令查看 Netty 实际使用的 Selector:

1
2
3
4
5
# 运行你的 Netty 应用时添加 JVM 参数
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider

# 或者在代码中打印:(在 Linux 上输出: sun.nio.ch.EPollSelectorProvider)
System.out.println(SelectorProvider.provider().getClass().getName());

具体的调用过程示意图:


缓冲区(buffer)

buffer的分类

NIO的Buffer本质上是一个内存块,既可以写入数据,也可以从中读取数据。Java NIO中代表缓冲区的Buffer类是一个抽象类,位于 java.nio包中。Buffer内部是一个内存块(数组),注意作为读写缓冲区的数 组,并没有定义在Buffer类中,而是定义在各具体子类中。

与普通的内存块(Java 数组)不同的是:NIO Buffer对象提供了一组比较有效的方法,用来进行写入和读取的交替访问。在NIO中, 有8种缓冲区类:

  • ByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • DubbleBuffer
  • FloatBuffer
  • CharBuffer
  • MappedByteBuffer

前7种Buffer类型覆盖了能在IO中传输的所有Java 基本数据类型,第8种类型是一种专门用于内存映射的ByteBuffer类 型。不同的Buffer子类可以操作的数据类型能够通过名称进行判断, 比如 IntBuffer 只能操作 Integer 类型的对象。最常用的是ByteBuffer。上述缓冲区的管理方式几乎一致,都是通过allocate()获取缓冲区。


buffer的基本方法

缓冲区存取数据的两个核心方法:

  • put 存入数据到buffer中
  • get 获取buffer中的数据

要想对缓冲区的数据进行正确的存取,必须先要熟悉buffer缓冲区的几个核心属性,如下:

  • capacity 表示缓冲区中最大存储数据的容量,一旦声明不能改变。注意这里的capacity并不是指内部的内存块byte[]数组的字节数量,而是指能写入的数据对象的最大限制数量,下面的参数都是同理。
  • limit 表示缓冲区中可以操作数据的容量大小(limit后面的数据不能进行读写,默认初始大小与capacity相等);
  • position 表示缓冲区中正在操作数据的位置
  • mark 表示通过mark方法标记的当前position的位置,可以通过reset方法重置buffer的position到mark的位置。

综上可以看出 0 <= mark <= position <= limit <= capacity。请看如下简单的测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 测试 allocate、put、flip、get、rewind、clear
*/
@Test
public void test01() {
// 1. 分配一个指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10

// 2. 利用put将数据存入缓冲区
String str = "abcde";
buffer.put(str.getBytes());
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10

// 3. 调用flip方法切换到读数据模式(flip将limit值为当前position的值,然后使position归0)
buffer.flip();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 10

// 在读取完成后,如何再一次将缓冲区切换成写模式呢?
// 答案是:可以调用Buffer.clear()清空或者Buffer.compact() 压缩方法,它们可以将缓冲区转换为写模式。

// 4. 利用get方法读取缓冲区数据
byte[] bs = new byte[buffer.limit()];
ByteBuffer buffer2 = buffer.get(bs);
System.out.println(new String(bs)); // abcde
System.out.println(buffer2==buffer); // true
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 10

// 5. 倒带,使buffer可以重复读取(rewind使position归0)
buffer.rewind();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 10

// 6. 清空buffer(注意清空缓冲区后buffer中原来的数据依赖存在)
buffer.clear();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10
}

mark 和 reset 两个方法是配套使用的:Buffer.mark() 方法将当前 position 的值保存起来放在 mark 属性中,让 mark 属性记住这个临时位置;然后可以调用Buffer.reset()方法将mark的值恢复到 position中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 测试 mark、reset
*/
@Test
public void test2() {
// 1. 分配一个指定大小的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10

// 2. 利用put将数据存入缓冲区
String str = "abcde";
buffer.put(str.getBytes());
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10

// get方法读取缓冲区数据
buffer.flip();
byte[] bs = new byte[buffer.limit()];
buffer.get(bs, 0, 2);
System.out.println(new String(bs, 0, 2)); // ab
System.out.println(buffer.position()); // 2

// 通过mark标记当前position的位置
buffer.mark();
buffer.get(bs, 2, 2);
System.out.println(new String(bs, 2, 2)); // cd
System.out.println(buffer.position()); // 4

// 通过reset恢复到刚才mark的位置
buffer.reset();
System.out.println(buffer.position()); // 2

// 查看缓冲区中还有没有剩余的可操作数据
if (buffer.hasRemaining()) { // true
System.out.println(buffer.remaining()); // 3
}
}

使用Buffer类的基本步骤:

  • 使用创建子类实例对象的allocate()方法创建一个Buffer类 的实例对象。
  • 调用put()方法将数据写入缓冲区中。
  • 写入完成后,在开始读取数据前调用Buffer.flip()方法, 将缓冲区转换为读模式。
  • 调用get()方法,可以从缓冲区中读取数据。
  • 读取完成后,调用Buffer.clear()方法或Buffer.compact() 方法,将缓冲区转换为写模式,可以继续写入。


几个基本方法的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 切换读模式:
* 首先将写模式下的缓冲区中内容的最后写入位置position值作为读模式下的limit上限值。
* 其次把读的起始位置position的值设为0,表示从头开始读。
* 最后清除之前的mark标记,因为mark保存的是写模式下的临时位置,发生模式翻转后,如果继续使用旧的mark标记,就会造成位置混乱。
*/
public Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

/**
* 倒带重读:
* 首先 position 重置为0,所以可以重读缓冲区中的所有数据
* 其次 mark 被清理,表示之前的临时位置不能再用了。
* 注意:它与flip的唯一区别就是不改变limit!
*/
public Buffer rewind() {
position = 0;
mark = -1;
return this;
}

/**
* 读模式下判断是否存在剩余未读,通常用于遍历读取
*/
public final boolean hasRemaining() {
return position < limit;
}

/**
* 计算还有多少未读
*/
public final int remaining() {
int rem = limit - position;
return rem > 0 ? rem : 0;
}

/**
* 打桩:
* 将当前position的值保存起来放在mark属性中
*/
public Buffer mark() {
mark = position;
return this;
}

/**
* 回桩:
* 将mark的值恢复到position中
*/
public Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}

/**
* 清空回到初始位置
*/
public Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}


buffer的非线程安全

Buffer及其子类(ByteBuffer、CharBuffer等)都是非线程安全的类。这么设计完全是出于性能考虑。NIO的设计哲学:

  • 单线程处理单连接:在 Reactor 模式下,一个 Channel 只会被一个 EventLoop 线程处理
  • 零拷贝优化:DirectByteBuffer 直接操作堆外内存,减少 JVM 开销
  • 极致性能:为了减少 1 微秒的延迟,放弃所有线程安全开销

基于这一点,生产环境 Netty 的标准正确用法应该是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 每个 Channel 绑定一个 EventLoop,Buffer 不跨线程
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg 是 ByteBuf,只在当前 EventLoop 线程访问
ByteBuf buf = (ByteBuf) msg;
try {
// 安全:这个 handler 只在一个线程执行
processBuffer(buf);
} finally {
buf.release(); // 引用计数回收
}
}
}

线程间传递的解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 方案1:深度拷贝(简单但耗内存)
ByteBuffer original = ByteBuffer.allocate(1024);
ByteBuffer copy = ByteBuffer.allocate(original.capacity());
copy.put(original);
original.flip();
copy.flip();


// 方案2:消息传递 + 引用计数(Netty ByteBuf)
ByteBuf buf = Unpooled.buffer(1024);
buf.writeBytes("Hello".getBytes());
System.out.println("初始引用计数: " + buf.refCnt()); // 输出: 1
buf.retain(); // 增加引用计数到2(告诉系统 “还有一个线程要用”)
otherThreadExecutor.execute(() -> { // 线程2:工作线程
try {
System.out.println("工作线程读取: " + buf.toString(StandardCharsets.UTF_8));
System.out.println("工作线程引用计数: " + buf.refCnt()); // 输出: 2
} finally {
buf.release(); // 减少引用计数到 1
}
});
buf.release(); // 主线程用完。减少引用计数到 0 → 触发真正释放内存


// 方案3:线程本地存储
private static final ThreadLocal<ByteBuffer> BUFFER_LOCAL =
ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(1024)); // 堆外内存,性能更好
// 注意内存泄漏风险(尤其在线程池中),使用后一定要清理,防止内存泄漏
pool.submit(() -> {
try {
ByteBuffer buffer = BUFFER_LOCAL.get();
// 使用 buffer...
} finally {
BUFFER_LOCAL.remove(); // 防止内存泄漏!
}
});

现代框架的实际组合使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Netty 中的真实模式:两者结合
public class AdvancedHandler extends ChannelInboundHandlerAdapter {
// 小缓冲区用 ThreadLocal
private static final ThreadLocal<byte[]> TEMP_BUFFER = ThreadLocal.withInitial(() -> new byte[256]);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf data = (ByteBuf) msg; // 大数据用引用计数

// 1. 大数据的引用计数传递
data.retain();
businessPool.submit(() -> {
try {
processBigData(data);
} finally {
data.release();
}
});

// 2. 小数据的 ThreadLocal 复用
byte[] temp = TEMP_BUFFER.get();
data.getBytes(0, temp, 0, Math.min(256, data.readableBytes()));
processMetadata(temp);
}
}


直接和非直接缓冲区

这两类缓冲的定义如下:

  • 非直接缓冲区:通过 allocate() 方法分配的缓冲区,将缓冲区建立在 JVM的内存中,传统I/O操作使用的缓冲区以及通过 allocate() 创建的缓冲区都属于非直接缓冲区;
  • 直接缓冲区:通过 allocateDirect() 方法分配的缓冲区,将缓冲区建立在 操作系统的物理内存中,直接缓冲区方式通过在物理内存中创建映射文件减少了中间的 copy 步骤,因而I/O效率较高,但也增加了应用程序的不稳定性;
  • 直接字节缓冲区(注意只是字节缓冲区)还可以通过 FileChannel.map() 方法将文件区域直接映射到内存中来创建,该方法返回 MappedByteBuffer。Java平台的实现有助于通过JNI从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的区域,则试图访问该区域不会更改缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。字节缓冲区是直接还是非直接缓冲区,可以通过调用 isDirect() 方法来确定,以便在性能关键性代码中执行显示的缓冲区管理;

注意,创建直接缓冲区的消耗要大于非直接缓冲区,直接缓冲区的内容可以驻留在常规的垃圾回收堆外,它不受JVM控制,而是受控于操作系统,因此直接缓冲区对应用程序内存造成的影响不明显。建议将直接缓冲区分配给那些易受基础系统的本机I/O操作影响的大型、持久的对象。一般最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。

简单测试及源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Test
public void test03() {
ByteBuffer buffer1 = ByteBuffer.allocate(10);
System.out.println(buffer1.isDirect()); // false
ByteBuffer buffer2 = ByteBuffer.allocateDirect(10);
System.out.println(buffer2.isDirect()); // true
}

// allocate源码
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}

// allocateDrect源码
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
DirectByteBuffer(int cap) {
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);

long base = 0;
try {
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}


直接内存的分配和释放

内存的分配原理

1
2
// 当我们调用
ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 1024); // 1MB

实际发生了以下几步:

关键点:

  • 两段内存:一个小ByteBuffer对象(在堆内) + 一大块直接内存(在堆外)
  • 引用关系:ByteBuffer通过 address 字段保存直接内存地址
  • 清理钩子:创建 Cleaner(PhantomReference子类)用于释放内存


性能对照实验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 直接内存 vs 堆内存性能对比
*/
public static void performanceComparison() {
System.out.println("=== 示例:性能对比 ===");

int size = 10 * 1024 * 1024; // 10MB
int iterations = 100;

// 测试直接内存
long directStart = System.currentTimeMillis();
ByteBuffer directBuffer = ByteBuffer.allocateDirect(size);

for (int i = 0; i < iterations; i++) {
// 写入数据
for (int j = 0; j < size; j++) {
directBuffer.put((byte) (j & 0xFF));
}
directBuffer.flip();

// 读取数据
while (directBuffer.hasRemaining()) {
directBuffer.get();
}

directBuffer.clear(); // 准备下一轮
}
long directTime = System.currentTimeMillis() - directStart;

// 测试堆内存
long heapStart = System.currentTimeMillis();
ByteBuffer heapBuffer = ByteBuffer.allocate(size);

for (int i = 0; i < iterations; i++) {
// 写入数据
for (int j = 0; j < size; j++) {
heapBuffer.put((byte) (j & 0xFF));
}
heapBuffer.flip();

// 读取数据
while (heapBuffer.hasRemaining()) {
heapBuffer.get();
}

heapBuffer.clear();
}
long heapTime = System.currentTimeMillis() - heapStart;

System.out.printf("直接内存耗时: %,d ms%n", directTime);
System.out.printf("堆内存耗时: %,d ms%n", heapTime);
System.out.printf("性能差异: %.2fx%n", (double) heapTime / directTime);
}
1
2
3
4
=== 示例2:性能对比 ===
直接内存耗时: 1,092 ms
堆内存耗时: 8,748 ms
性能差异: 8.01x


内存的释放或管理

方式1:自动GC释放(主要方式,但不可靠)

1
2
3
4
5
6
7
8
9
10
ByteBuffer buffer = ByteBuffer.allocateDirect(100);
buffer = null; // 失去引用

// GC发生时...
// 1. ByteBuffer对象被标记为不可达
// 2. Cleaner对象(虚引用)被加入引用队列
// 3. ReferenceHandler线程处理引用队列
// 4. 调用Cleaner.clean() → 释放直接内存
// 5. 最后回收ByteBuffer对象本身
System.gc(); // 提示GC,但不保证立即执行!

方式2:显式调用Cleaner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 1024);
// 使用buffer...

// 方法1:通过反射调用 Cleaner
if (buffer.isDirect()) {
try {
Method cleanerMethod = buffer.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(buffer);
if (cleaner != null) {
Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.invoke(cleaner);
}
} catch (Exception e) {
// 处理异常
}
}

// 方法2:使用Unsafe(更底层)
sun.misc.Unsafe unsafe = getUnsafe();
long address = ((sun.nio.ch.DirectBuffer) buffer).address();
unsafe.freeMemory(address); // 直接释放

// 方法3:Netty的方式
if (buffer instanceof sun.nio.ch.DirectBuffer) {
((sun.nio.ch.DirectBuffer) buffer).cleaner().clean();
}

更加推荐的使用方式:内存池(如 Netty)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 错误:频繁分配释放
public ByteBuffer processRequest(byte[] data) {
ByteBuffer buffer = ByteBuffer.allocateDirect(data.length);
buffer.put(data);
return buffer; // 调用者可能忘记释放
}

// 正确:使用内存池
public class DirectMemoryPool {
private final Queue<ByteBuffer> pool = new ConcurrentLinkedQueue<>();
private final int bufferSize;

public ByteBuffer borrowBuffer() {
ByteBuffer buffer = pool.poll();
if (buffer == null) {
buffer = ByteBuffer.allocateDirect(bufferSize);
}
buffer.clear();
return buffer;
}

public void returnBuffer(ByteBuffer buffer) {
if (buffer != null && buffer.isDirect()) {
buffer.clear();
pool.offer(buffer);
}
}

public void destroy() {
for (ByteBuffer buffer : pool) {
if (buffer instanceof sun.nio.ch.DirectBuffer) {
((sun.nio.ch.DirectBuffer) buffer).cleaner().clean();
}
}
pool.clear();
}
}

防御性编程模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class SafeDirectBuffer implements AutoCloseable {
private ByteBuffer buffer;
private boolean closed = false;

public SafeDirectBuffer(int capacity) {
this.buffer = ByteBuffer.allocateDirect(capacity);
}

public ByteBuffer getBuffer() {
if (closed) {
throw new IllegalStateException("Buffer已关闭");
}
return buffer;
}

@Override
public void close() {
if (!closed && buffer != null) {
try {
// 尝试通过Cleaner释放
if (buffer instanceof sun.nio.ch.DirectBuffer) {
Cleaner cleaner = ((sun.nio.ch.DirectBuffer) buffer).cleaner();
if (cleaner != null) {
cleaner.clean();
}
}
} finally {
buffer = null;
closed = true;
}
}
}

@Override
protected void finalize() throws Throwable {
try {
close(); // 最后的安全网
} finally {
super.finalize();
}
}
}

// 使用示例:try-with-resources确保释放
try (SafeDirectBuffer safeBuffer = new SafeDirectBuffer(1024)) {
ByteBuffer buffer = safeBuffer.getBuffer();
// 使用buffer...
} // 自动调用close()释放内存


通道(channel)

啥是通道?

前面提到,Java NIO中一个 socket 连接使用一个Channel来表示。 从更广泛的层面来说,一个通道可以表示一个底层的文件描述符,例 如硬件设备、文件、网络连接等。然而,远不止如此,Java NIO的通道可以更加细化。例如,不同的网络传输协议类型,在 Java 中都有不同的NIO Channel实现。


有哪些重要的通道?

这里不对Java NIO的全部通道类型进行过多的描述,仅着重介绍其中最为重要的四种Channel实现:FileChannel、SocketChannel、 ServerSocketChannel、DatagramChannel。

  • FileChannel :本地文件传输通道,用于本地文件的数据读写。
  • SocketChannel:套接字通道,用于套接字TCP连接的数据读 写。
  • ServerSocketChannel:服务端器监听通道,允许我们监听TCP连接请求,为每个监听到的请求创建一个 SocketChannel通道。
  • DatagramChannel:UDP协议传输通道,用于UDP的数据读写。


FileChannel 介绍

通道的获取

获取通道的三种方式:

  1. Java针对支持通道的类提供了 getChannel() 方法来获取通道,这些类有 FileInputStream、FileOutputStream、RandomAccessFile、Socket、ServerSocket、DatagramSocket等;
  2. NIO.2(JDK1.7+)可以通过各个通道的实现类提供的静态方法 open() 来获取通道;
  3. NIO.2(JDK1.7+)也可以通过 Files.newByteChannel 方法来获取通道;

使用 channel通道+非直接缓冲区 完成文件的复制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Test
public void test01() {
FileInputStream fis = null;
FileOutputStream fos = null;
FileChannel fisChannel = null;
FileChannel fosChannel = null;
try {
// 1. 获取channel
fis = new FileInputStream("1.jpg");
fos = new FileOutputStream("2.jpg");
fisChannel = fis.getChannel();
fosChannel = fos.getChannel();

// 2. 通过channel和buffer完成数据的传输
// 下面的数据传输逻辑可以用一句代码搞定
// long size = fosChannel.transferFrom(fisChannel, 0, fisChannel.size()); // 或
// long size = fisChannel.transferTo(0, fisChannel.size(), fosChannel);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (fisChannel.read(buffer) != -1) {
buffer.flip(); // 切换成读数据模式
fosChannel.write(buffer);
buffer.clear(); // 清空缓冲区
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭通道和流
if (fosChannel != null) {
try { fosChannel.close(); } catch (IOException e) { e.printStackTrace(); }
}
if (fisChannel != null) {
try { fisChannel.close(); } catch (IOException e) { e.printStackTrace(); }
}
if (fos != null) {
try { fos.close(); } catch (IOException e) { e.printStackTrace(); }
}
if (fis != null) {
try { fis.close(); } catch (IOException e) { e.printStackTrace(); }
}
}
}


数据的读取和写入

在大部分应用场景中,从通道读取数据都会调用通道的int read(ByteBuffer buf)方法,它把从通道读取的数据写入ByteBuffer 缓冲区,并且返回读取的数据量。

1
2
3
4
5
6
7
8
9
10
11
12
RandomAccessFile file = new RandomAccessFile(fileName, "rw");
// 获取通道(可读可写)
FileChannel channel = file.getChannel();
// 获取一个字节缓冲区
ByteBuffer buf = ByteBuffer.allocate(CAPACITY);
int length = -1;

// 调用通道的read()方法,读取数据并写入字节类型的缓冲区
// 对于通道来说是读模式,对于ByteBuffer缓冲区来说是写入数据(处于写模式)
while ((length = channel.read(buf)) != -1) {//
// ...
}

把数据写入通道,在大部分应用场景中都会调用通道的 write(ByteBuffer)方法,此方法的参数是一个ByteBuffer缓冲区实 例,是待写数据的来源。write(ByteBuffer)方法的作用是从ByteBuffer缓冲区中读取数 据,然后写入通道自身,而返回值是写入成功的字节数。

1
2
3
4
5
6
7
8
9
// 如果buf处于写模式(如刚写完数据),需要翻转buf,使其变成读模式
buf.flip();
int outlength = 0;

// 调用write()方法,将buf的数据写入通道
// 对于从到来说是写模式,对于 buf 缓冲区来说是读取数据(出于读模式)
while ((outlength = outchannel.write(buf)) != 0) {
System.out.println("写入的字节数:" + outlength);
}


强制刷新到磁盘

在将缓冲区写入通道时,出于性能的原因,操作系统不可能每次 都实时地将写入数据落地(或刷新)到磁盘,完成最终的数据保存。在将缓冲区数据写入通道时,要保证数据能写入磁盘,可以在写 入后调用一下FileChannel的force()方法。

1
channel.force(true);


通道的关闭

当通道使用完成后,必须将其关闭。关闭非常简单,调用close() 方法即可。

1
channel.close();


文件的复制案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public static void testCopy01() throws IOException {
Instant startTime = Instant.now();
Path source = Paths.get("/Users/xxx/Downloads/111.png");
Path target = Paths.get("/Users/xxx/Downloads/222.png");

try (FileChannel finChannel = FileChannel.open(source, StandardOpenOption.READ);
FileChannel foutChannel = FileChannel.open(target,
StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {

// 方法1:零拷贝传输(最佳性能)
long fileSize = finChannel.size();
long transferred = finChannel.transferTo(0, fileSize, foutChannel);
System.out.printf("零拷贝传输: %,d bytes%n", transferred);
}

Instant endTime = Instant.now();
long durationMs = Duration.between(startTime, endTime).toMillis();
System.out.printf("耗时: %,d ms%n", durationMs);
}

public static void testCopy02() {
// 演示代码。实际生产还是建议使用效率更高的 transferFrom 或 transferTo!
File srcFile = new File("/Users/xxx/Downloads/111.png");
File destFile = new File("/Users/xxx/Downloads/222.png");
try {
// 如果目标文件不存在,则新建
if (!destFile.exists()) {
destFile.createNewFile();
}
long startTime = System.currentTimeMillis();
FileInputStream fis = null;
FileOutputStream fos = null;
FileChannel inChannel = null; // 输入通道
FileChannel outchannel = null; // 输出通道
try {
fis = new FileInputStream(srcFile);
fos = new FileOutputStream(destFile);
inChannel = fis.getChannel();
outchannel = fos.getChannel();
int length = -1;
// 新建buf,处于写模式
ByteBuffer buf = ByteBuffer.allocate(1024);
// 从输入通道读取到buf
while ((length = inChannel.read(buf)) != -1) {
// buf第一次模式切换:翻转buf,从写模式变成读模式
buf.flip();
int outlength = 0;
// 将buf写入输出的通道
while ((outlength = outchannel.write(buf)) != 0) {
System.out.println("写入的字节数:" + outlength);
}
// buf第二次模式切换:清除buf,变成写模式
buf.clear();
}
// 强制刷新到磁盘
outchannel.force(true);
} finally {
// 关闭所有的可关闭对象
IOUtils.closeQuietly(outchannel);
IOUtils.closeQuietly(fos);
IOUtils.closeQuietly(inChannel);
IOUtils.closeQuietly(fis);
}
long endTime = System.currentTimeMillis();
System.out.println("base复制毫秒数:" + (endTime - startTime));
} catch (IOException e) {
e.printStackTrace();
}
}

通道之间的数据传输可以直接更方便地使用:

1
2
long size1 = fosChannel.transferFrom(fisChannel, 0, fisChannel.size()); 
long size2 = fisChannel.transferTo(0, fisChannel.size(), fosChannel);

在实际工程中,transferTo 或 transferFrom 可能需要放在循环中多次调用。虽然它们的名称听起来像是一个“原子操作”,但由于它们直接与操作系统的内核缓冲区(Socket Buffer)和文件系统缓冲区(Page Cache)打交道,它们受到的物理限制是完全一样的。

transferTo 的底层在 Linux 上对应的是 sendfile 系统调用。数据只在内核态内部流动(从文件系统缓存直接到网络协议栈),它之所快,本质上是减少了数据拷贝的次数和上下文切换的开销。在传输时之所以会中断或只完成一部分,主要有以下三个原因:

  • Socket 缓冲区满: TCP 传输是有流控的。如果接收方处理太慢,或者网络带宽达到上限,发送方的 Socket 发送缓冲区就会被填满。此时,内核会停止从文件拷贝数据,transferTo 只能返回当前实际已经发送的字节数。
  • 操作系统限制: 许多操作系统(尤其是 Linux)对 sendfile 单次传输的数据量有上限(通常是 2GB)。如果文件大于 2GB,单次调用必然无法完成。
  • 内存压力: 内核在做数据交换时,也需要考虑页缓存(Page Cache)的调度,可能会因为资源波动而提前返回。

工业级的 “零拷贝” 循环写法:为了保证文件能完整传输,你必须记录当前的 Position(位置),并根据每次返回的 Transferred(已传输量)动态调整下一次的起点。

1
2
3
4
5
6
7
8
long pos = 0;
long size = fileChannel.size();
while (pos < size) {
// 问:能发多少? 答:看网卡缓存够不够
long sent = fileChannel.transferTo(pos, size - pos, socketChannel);
if (sent <= 0) break; // 网络断开或缓冲区异常
pos += sent;
}

transferFrom 的作用是从源通道读取数据并写入当前文件通道。它之所以不能一次性完成,原因如下:

  • 源端数据还没到齐: 如果 srcChannel 是一个网络 Socket,内核缓冲区里此时可能只有 4KB 数据,即使你请求 count 为 10MB,它也只能把现有的 4KB 给你,然后立即返回 4096。
  • 目标端写入受阻: 如果磁盘 I/O 繁忙,或者触发了操作系统的同步刷盘机制,内核可能会暂停当前的写入操作,导致只写入了一部分数据。
  • 协议栈限制: 就像 transferTo 受限于发送缓冲区一样,transferFrom 受限于接收缓冲区。
1
2
3
4
5
6
7
8
long pos = 0;
long count = expectedSize; // 预期的文件总大小
while (pos < count) {
// 问:能读多少? 答:看网卡现在到了多少
long received = fileChannel.transferFrom(socketChannel, pos, count - pos);
if (received <= 0) break; // 对端关闭或无数据
pos += received;
}

你可能会问:“既然都要写循环,Java 为什么不直接在底层帮我循环完再返回?” 这是为了非阻塞的灵活性:

  • 控制权: 如果 transferTo 在底层死等 1GB 传输完才返回,那你的线程就会被阻塞很久。在高性能服务器(如 Netty)中,我们希望线程能立刻返回,去处理其他 Socket 的事件,而不是卡在某一个文件的拷贝上。
  • 超时与取消: 交给用户写循环,你可以随时插入逻辑:比如传输了 10 秒还没完,就主动断开;或者在循环里更新进度条。


SocketChannel 介绍

在NIO中,涉及网络连接的通道有两个:一个是SocketChannel, 负责连接的数据传输;另一个是ServerSocketChannel,负责连接的监听。其中,NIO中的 SocketChannel 传输通道与 OIO 中的 Socket 类对应, NIO 中的 ServerSocketChannel 监听通道对应于 OIO 中的 ServerSocket 类。ServerSocketChannel 仅应用于服务端,而 SocketChannel 同时处于服务端和客户端。所以,对于一个连接,两端都有一个负责传输的 SocketChannel。无论是 ServerSocketChannel 还是 SocketChannel,都支持阻塞和非阻塞两种模式。

  • socketChannel.configureBlocking(false):设置为非阻塞模式。
  • socketChannel.configureBlocking(true):设置为阻塞模式。

在阻塞模式下,SocketChannel 的连接、读、写操作都是同步阻塞式的,在效率上与 Java OIO 面向流的阻塞式读写操作相同。因此,在 这里不介绍阻塞模式下通道的具体操作。在非阻塞模式下,通道的操作是异步、高效的,这也是相对于传统OIO的优势所在。


通道的获取

1
2
3
4
5
6
// 获得一个套接字传输通道
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞模式
socketChannel.configureBlocking(false);
// 对服务器的IP和端口发起连接
socketChannel.connect(new InetSocketAddress("127.0.0.1"80));

在非阻塞情况下,与服务器的连接可能还没有真正建立,socketChannel.connect() 方法就返回了,因此需要不断地自旋,检查当前是否连接到了主机:

1
2
3
while(!socketChannel.finishConnect() ){
//不断地自旋、等待,或者做一些其他的事情
}

在连接建立的事件到来时,服务端的ServerSocketChannel能成功地查询出这个新连接事件,并且通过调用服务端ServerSocketChannel 监听套接字的accept()方法来获取新连接的套接字通道:

1
2
3
4
5
6
// 新连接事件到来,首先通过事件获取服务器监听通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 获取新连接的套接字通道
SocketChannel socketChannel = server.accept();
// 设置为非阻塞模式
socketChannel.configureBlocking(false);


数据的读取和写入

当SocketChannel传输通道可读时,可以从SocketChannel读取数据,具体方法与前面的文件通道读取方法是相同的。调用read()方 法,将数据读入缓冲区ByteBuffer。在读取时,因为是异步的,所以我们必须检查read()的返回值, 以便判断当前是否读取到了数据。read()方法的返回值是读取的字节 数,如果是-1,那么表示读取到对方的输出结束标志,即对方已经输 出结束,准备关闭连接。实际上,通过read()方法读数据本身是很简 单的,比较困难的是在非阻塞模式下如何知道通道何时是可读的。这 需要用到NIO的新组件——Selector通道选择器。

1
2
ByteBufferbuf = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buf);

和前面把数据写入FileChannel 一样,大部分应用场景都会调用通道的 write(ByteBufferbuf) 方法:

1
2
3
// 写入前需要读取缓冲区,要求ByteBuffer是读模式
buffer.flip();
socketChannel.write(buffer);


通道的关闭

在关闭 SocketChannel 传输通道前,如果传输通道用来写入数据, 则建议调用一次 shutdownOutput() 终止输出方法,向对方发送一个输出的结束标志(-1)。然后调用 socketChannel.close() 方法,关闭套接字连接。

1
2
3
4
5
6
// 调用终止输出方法,向对方发送一个输出的结束标志(半关闭 Half-Close,告诉服务器 "我不再发送,但还能接收")
socketChannel.shutdownOutput();
// 还可以读取服务器响应
// ...
// 关闭套接字连接
IOUtils.closeQuietly(socketChannel);


发送文件的案例

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class NioTCPClient {

public static void main(String[] args) {
new NioTCPClient().sendFile("/Users/xxx/Downloads/test.png", "127.0.0.1", 9000);
}

public void sendFile(String srcPath, String host, int port) {
File file = new File(srcPath);
if (!file.exists()) {
System.err.println("文件不存在: " + srcPath);
return;
}

// 使用 try-with-resources 自动关闭资源
try (FileInputStream fis = new FileInputStream(file);
FileChannel fileChannel = fis.getChannel();
SocketChannel socketChannel = SocketChannel.open()) {

// 1. 建立连接
socketChannel.connect(new InetSocketAddress(host, port));
socketChannel.configureBlocking(true); // 发送端通常建议设为阻塞,简化逻辑
System.out.println("成功连接服务端: " + host + ":" + port);

// 2. 发送协议头 (文件名长度 + 文件名 + 文件长度)
byte[] fileNameBytes = file.getName().getBytes(StandardCharsets.UTF_8);
ByteBuffer header = ByteBuffer.allocate(4 + fileNameBytes.length + 8);

header.putInt(fileNameBytes.length); // 4字节:文件名长度
header.put(fileNameBytes); // N字节:文件名内容
header.putLong(file.length()); // 8字节:文件总长度

header.flip();
while (header.hasRemaining()) {
socketChannel.write(header);
}
System.out.println("协议头发送完毕,开始传输内容...");

// 3. 零拷贝传输文件内容 (sendfile 调用)
long position = 0;
long size = file.length();
while (position < size) {
// 参数1:从文件的哪个位置开始
// 参数2:本次还剩多少没传
// 参数3:目标通道
long transferred = fileChannel.transferTo(position, size - position, socketChannel);
if (transferred <= 0) {
break; // 如果返回0,说明 Socket 缓冲区暂时满了,或者连接已断开
}
position += transferred;
System.out.printf("传输进度: %.2f%%\r", (position * 100.0 / size));
}

System.out.println("\n文件传输成功!");
socketChannel.shutdownOutput(); // 优雅关闭输出流
} catch (IOException e) {
e.printStackTrace();
}
}
}

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class NioTCPServer {

public static void main(String[] args) throws IOException {
new NioTCPServer().startServer(9000);
}

// 状态机标记
private static final int STATE_READ_LEN = 0;
private static final int STATE_READ_NAME = 1;
private static final int STATE_READ_SIZE = 2;
private static final int STATE_READ_CONTENT = 3;

static class ClientSession {
int state = STATE_READ_LEN;
String fileName;
long fileLength;
long receiveLength = 0;
FileChannel outChannel;
int fileNameLen;
ByteBuffer headBuffer = ByteBuffer.allocate(8); // 用于读取int或long
}

public void startServer(int port) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(port));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端已启动,监听端口: " + port);

while (true) {
if (selector.select() <= 0) continue;
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();

if (key.isAcceptable()) {
handleAccept(key, selector);
} else if (key.isReadable()) {
handleRead(key);
}
}
}
}

private void handleAccept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 为每个新连接绑定一个 Session 附件(相当于给每个网络连接 SocketChannel 绑定的一个随身储物柜)
socketChannel.register(selector, SelectionKey.OP_READ, new ClientSession());
System.out.println("客户端连接成功: " + socketChannel.getRemoteAddress());
}

private void handleRead(SelectionKey key) {
SocketChannel sc = (SocketChannel) key.channel();
ClientSession session = (ClientSession) key.attachment();
ByteBuffer buffer = ByteBuffer.allocate(8192);

try {
int num;
while ((num = sc.read(buffer)) > 0) {
buffer.flip();
processProtocol(sc, session, buffer);
buffer.clear();
}
if (num == -1) {
closeSession(key, session);
}
} catch (IOException e) {
e.printStackTrace();
closeSession(key, session);
}
}

private void processProtocol(SocketChannel sc, ClientSession session, ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
switch (session.state) {
case STATE_READ_LEN: // 1. 读取文件名长度
if (buffer.remaining() >= 4) {
session.fileNameLen = buffer.getInt();
session.state = STATE_READ_NAME;
} else return; // 数据不够,等下次读
break;

case STATE_READ_NAME: // 2. 读取文件名
if (buffer.remaining() >= session.fileNameLen) {
byte[] nameBytes = new byte[session.fileNameLen];
buffer.get(nameBytes);
session.fileName = new String(nameBytes, StandardCharsets.UTF_8);

File file = new File("received_" + session.fileName);
session.outChannel = new FileOutputStream(file).getChannel();
session.state = STATE_READ_SIZE;
} else return;
break;

case STATE_READ_SIZE: // 3. 读取文件总大小
if (buffer.remaining() >= 8) {
session.fileLength = buffer.getLong();
session.state = STATE_READ_CONTENT;
System.out.println("开始接收文件: " + session.fileName + ", 大小: " + session.fileLength);
} else return;
break;

case STATE_READ_CONTENT: // 4. 写入文件内容
int remainingInFile = (int) (session.fileLength - session.receiveLength);
int writeSize = Math.min(buffer.remaining(), remainingInFile);

// 限制只读取属于当前文件的数据
ByteBuffer slice = buffer.slice();
slice.limit(writeSize);
session.outChannel.write(slice);

// 同步 buffer 的 position
buffer.position(buffer.position() + writeSize);
session.receiveLength += writeSize;

if (session.receiveLength >= session.fileLength) {
System.out.println("文件接收完成: " + session.fileName);
session.outChannel.close();
// 如果还有后续逻辑可以重置 session 状态
}
break;
}
}
}

private void closeSession(SelectionKey key, ClientSession session) {
try {
if (session.outChannel != null) session.outChannel.close();
key.channel().close();
key.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
}


DatagramChannel 介绍

在Java中使用UDP传输数据比TCP更加简单。和 socket 的 TCP 不同, UDP不是面向连接的协议。使用 UDP 时,只要知道服务器的IP和端口就 可以直接向对方发送数据。在 Java NIO 中,使用 DatagramChannel 来处理 UDP

通道的获取

获取数据报通道的方式很简单,调用 DatagramChannel 类的 open 静态方法即可。然后调用configureBlocking(false)方法,设置成非阻塞模式。

1
2
3
4
// 获取DatagramChannel
DatagramChannel channel = DatagramChannel.open();
// 设置为非阻塞模式
datagramChannel.configureBlocking(false);

如果需要接收数据,还需要调用bind()方法绑定一个数据报的监听端口,具体如下:

1
2
// 调用 bind 方法绑定一个数据报的监听端口
channel.socket().bind(new InetSocketAddress(18080));


数据的读取和写入

当 DatagramChannel 通道可读时,可以从DatagramChannel 读取数据。和前面的 SocketChannel 读取方式不同,这里不调用 read 方法, 而是调用 receive(ByteBufferbuf) 方法将数据从 DatagramChannel读入,再写入ByteBuffer 缓冲区中。通道读取 receive(ByteBufferbuf) 方法虽然读取了数据到buf缓冲 区,但是其返回值是SocketAddress类型,表示返回发送端的连接地址 (包括IP和端口)。通过receive方法读取数据非常简单,但是在非阻塞模式下如何知道DatagramChannel通道何时是可读的呢?和 SocketChannel一样,同样需要用到NIO的新组件——Selector通道选择器。

1
2
3
4
// 创建缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 从 DatagramChannel 读取,再写入 ByteBuffer 缓冲区
SocketAddress clientAddr = datagramChannel.receive(buf);

向DatagramChannel发送数据,和向SocketChannel通道发送数据 的方法是不同的。这里不是调用write()方法,而是调用send()方法。由于UDP是面向非连接的协议,因此在调用send()方法发送数据时 需要指定接收方的地址(IP和端口)。

1
2
3
4
5
6
// 把缓冲区翻转为读模式
buffer.flip();
// 调用send()方法,把数据发送到目标IP+端口
dChannel.send(buffer, new InetSocketAddress("127.0.0.1",18899));
// 清空缓冲区,切换到写模式
buffer.clear();


通道的关闭

这个比较简单,直接调用close()方法即可关闭数据报通道。

1
2
// 简单关闭即可
dChannel.close();


发送数据的案例

在客户端使用 DatagramChannel 发送数据比在客户端使用 SocketChanne l发送数据要简单得多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Scanner;

public class UDPClient {
public static void main(String[] args) throws IOException {
new UDPClient().send();
}

public void send() throws IOException {
// 获取DatagramChannel
DatagramChannel dChannel = DatagramChannel.open();
// 设置为非阻塞
dChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
System.out.println("UDP客户端启动成功!");
System.out.println("请输入发送内容: ");
while (scanner.hasNext()) {
String next = scanner.next();
buffer.put(next.getBytes());
buffer.flip();
// 通过DatagramChannel发送数据
dChannel.send(buffer, new InetSocketAddress("127.0.0.1",9001));
buffer.clear();
}
// 关闭DatagramChannel
dChannel.close();
}
}

在服务端,首先调用了bind()方法绑定DatagramChannel的监听端 口。当数据到来时调用了receive()方法,从DatagramChannel接收数 据后写入ByteBuffer缓冲区中。在服务端代码中,为了监控数据的到来,使用了Selector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;

public class UDPServer {
public static void main(String[] args) throws IOException {
new UDPServer().receive();
}

public void receive() throws IOException {
// 获取DatagramChannel
DatagramChannel datagramChannel = DatagramChannel.open();
// 设置为非阻塞模式
datagramChannel.configureBlocking(false);
// 绑定监听地址
datagramChannel.bind(new InetSocketAddress("127.0.0.1",9001));
System.out.println("UDP服务器启动成功!");
int i = datagramChannel.validOps();

// 开启一个通道选择器
Selector selector = Selector.open();
// 将通道注册到选择器
datagramChannel.register(selector, SelectionKey.OP_READ);
// 通过选择器查询IO事件
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 迭代IO事件
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 立即从 selectedKeys 集合中移除
iterator.remove();
// 可读事件,有数据到来
if (selectionKey.isReadable()) {
// 读取 DatagramChannel 数据
SocketAddress client = datagramChannel.receive(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
}
//关闭选择器和通道
selector.close();
datagramChannel.close();
}
}


附1.分散和聚集通道

概念介绍

  • 分散读取(scattering reads):将通道中的数据按顺序分散到多个缓冲区中
  • 聚集写入(gathering writes):将多缓冲区中的数据按顺序聚集到通道中

分散或聚集 IO 是使用多个而不是单个缓冲区来保存数据的读写方法。一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中。同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据。分散或聚集 IO 对于将数据流划分为单独的部分很有用,这有助于实现复杂的数据格式。


通道接口

通道可以有选择地实现两个新的接口: ScatteringByteChannelGatheringByteChannel

ScatteringByteChannel 是一个具有两个附加读方法的通道。这些read()方法很像标准的read方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。在分散读取中,通道依次填充每个缓冲区。填满一个缓冲区后,它就开始填充下一个。在某种意义上,缓冲区数组就像一个大缓冲区。

  • long read( ByteBuffer[] dsts );
  • long read( ByteBuffer[] dsts, int offset, int length );

GatheringByteChannel是一个具有两个附加写方法的通道。聚集写对于把一组单独的缓冲区中组成单个数据流很有用。为了与上面的消息例子保持一致,您可以使用聚集写入来自动将网络消息的各个部分组装为单个数据流,以便跨越网络传输消息。

  • long write( ByteBuffer[] srcs );
  • long write( ByteBuffer[] srcs, int offset, int length );


具体应用案例

分散/聚集 I/O 对于将数据划分为几个部分很有用。例如,您可能在编写一个使用消息对象的网络应用程序,每一个消息被划分为固定长度的头部和固定长度的正文。您可以创建一个刚好可以容纳头部的缓冲区和另一个刚好可以容纳正文的缓冲区。当您将它们放入一个数组中并使用分散读取来向它们读入消息时,头部和正文将整齐地划分到这两个缓冲区中。我们从缓冲区所得到的方便性对于缓冲区数组同样有效。因为每一个缓冲区都跟踪自己还可以接受多少数据,所以分散读取会自动找到有空间接受数据的第一个缓冲区。在这个缓冲区填满后,它就会移动到下一个缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void test03() throws IOException {
// 获取通道
RandomAccessFile raf1 = new RandomAccessFile("http-header-body.txt", "rw");
RandomAccessFile raf2 = new RandomAccessFile("http-header-body2.txt", "rw");
FileChannel inChannel = raf1.getChannel();
FileChannel outChannel = raf2.getChannel();

// 获取缓冲区
ByteBuffer buffer1 = ByteBuffer.allocate(40);
ByteBuffer buffer2 = ByteBuffer.allocate(100);

// 分散读取和聚集写入
ByteBuffer[] buffers = {buffer1, buffer2};
inChannel.read(buffers);
for(ByteBuffer buffer : buffers) {
buffer.flip();
}
System.out.println(new String(buffers[0].array(), 0, buffers[0].limit()));
System.out.println(new String(buffers[1].array(), 0, buffers[1].limit()));
outChannel.write(buffers);

// 关闭通道和流
outChannel.close();
inChannel.close();
raf2.close();
raf1.close();
}


附2.Pipe 管道

如果说 FileChannel 是为了读写文件,SocketChannel 是为了网络通信,那么 Pipe 就是专门为了在同一个 JVM 进程下的两个线程之间进行单向数据传输而设计的。NIO 的管道不是一个简单的字节数组,它由两个特殊的“端点”组成:

  • SinkChannel(汇聚洞口): 负责写入。数据从这里进去。
  • SourceChannel(源头洞口): 负责读取。数据从这里出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.charset.StandardCharsets;

/**
* NIO Pipe 是一个典型的单工模式(Single-Duplex)。如果你需要双向通信,必须定义两个 Pipe。
* 它的真正威力在于:你可以把 SourceChannel 设置为 非阻塞模式 并注册到 Selector 上,从而
* 实现 “一个线程监听网络连接的同时,还能监听来自其他线程的内部指令”。
*
* @author KJ
* @description Pipe 传的是原始字节。在需要处理原始数据流(比如伪造一个文件流或处理压缩流)时,Pipe 更接近底层。
*/
public class PipeDemo {

public static void main(String[] args) throws Exception {
// 打开管道,底层由一个 Selector 驱动的循环字节缓冲区支撑
Pipe pipe = Pipe.open();

// 线程 t1:生产者 (Sink)
Thread t1 = new Thread(() -> {
// 使用 try-with-resources 确保发送 EOF 信号
try (Pipe.SinkChannel sinkChannel = pipe.sink()) {
for (int i = 0; i < 5; i++) {
String msg = "来自线程 t1 的数据包 [" + i + "]";
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);

// 分配与数据等长的 Buffer,或者使用池化的 Buffer
ByteBuffer buffer = ByteBuffer.wrap(bytes);

while (buffer.hasRemaining()) {
sinkChannel.write(buffer);
}

System.out.println("[发送端] 已写入: " + msg);
Thread.sleep(1000); // 模拟耗时任务
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("[发送端] 传输结束,关闭 SinkChannel");
}, "t1");

// 线程 t2:消费者 (Source)
Thread t2 = new Thread(() -> {
try (Pipe.SourceChannel sourceChannel = pipe.source()) {
// 使用稍大的 Buffer 减少系统调用次数
ByteBuffer buffer = ByteBuffer.allocate(1024);

// read 返回 -1 表示对端 SinkChannel 已关闭
while (sourceChannel.read(buffer) != -1) {
// 切换为读模式
buffer.flip();

// 不要直接使用 .array(),因为那会忽略 position 和 limit
byte[] received = new byte[buffer.remaining()];
buffer.get(received);

String msg = new String(received, StandardCharsets.UTF_8);
System.out.println("[接收端] 收到数据: " + msg);

// 清空并重置,准备下次读取
buffer.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("[接收端] 检测到对端关闭,读取结束");
}, "t2");

t1.start();
t2.start();
}
}


选择器(selector)

什么是选择器

简单地说,选择器的使命是完成IO的多路复用,其主要工作是通 道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关 系是监控和被监控的关系。选择器提供了独特的API方法,能够选出(select)所监控的通道 已经发生了哪些IO事件,包括读写就绪的IO操作事件。

在NIO编程中,一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以通过选择器,一个单线程可以处理数百、数千、数万甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。

通道和选择器之间的关联通过register(注册)的方式完成。调用通道的 Channel.register(Selector sel,int ops) 方法,可以将通道实例注册到一个选择器中。register 方法有两个参数:第一个参数指定通道注册到的选择器实例;第二个参数指定选择器要监控的IO事件类型。可供选择器监控的通道IO事件类型包括以下四种:

  • 可读就绪事件:SelectionKey.OP_READ, 1 << 0
  • 可写就绪事件:SelectionKey.OP_WRITE, 1 << 2
  • 连接就绪事件:SelectionKey.OP_CONNECT, 1 << 3
  • 接收就绪事件:SelectionKey.OP_ACCEPT, 1 << 4

以上事件类型常量定义在SelectionKey类中。如果选择器要监控 通道的多种事件,可以用“按位或”运算符来实现。例如,同时监控 可读和可写IO事件:

1
2
// 监控通道的多种事件,用“按位或”运算符来实现
int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

这里有必要解释一下什么是IO事件?IO事件不是对通道的IO操作,而是通道处于某个IO操作的就绪状态,表示通道具备执 行某个IO操作的条件。例如,某个SocketChannel传输通道如果完成了 和对端的三次握手过程,就会发生“连接就绪”(OP_CONNECT)事 件;某个ServerSocketChannel服务器连接监听通道,在监听到一个新 连接到来时,则会发生“接收就绪”(OP_ACCEPT)事件;一个 SocketChannel通道有数据可读,就会发生“读就绪”(OP_READ)事 件;一个SocketChannel通道等待数据写入,就会发生“写就绪” (OP_WRITE)事件。


SelectableChannel 类

并不是所有的通道都是可以被选择器监控或选择的。例如, FileChannel就不能被选择器复用。判断一个通道能否被选择器监控或选择有一个前提:判断它是否继承了抽象类 SelectableChannel(可选 择通道)。SelectableChannel 提供了实现通道可选择性 所需要的公共方法。Java NIO中所有网络连接socket通道都继承了 SelectableChannel类,都是可选择的。FileChannel并没有继承 SelectableChannel,因此不是可选择通道。


SelectionKey 类

通道和选择器的监控关系注册成功后就可以选择就绪事件,具体 的选择工作可调用 Selector 的 select() 方法来完成。通过该方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的那些感兴趣的IO事件。换句话说,一旦在通道中发生了某些IO事件(就绪状态达成),并且是在选择器中注册过的IO事件,就会被选择器选中,并放入 SelectionKey(选择键)的集合中。

SelectionKey 是什么呢?简单地说,SelectionKey 就是那些被选择器选中的IO事件。一个IO事件发生(就绪状态达成) 后,如果之前在选择器中注册过,就会被选择器选中,并放入 SelectionKey中;如果之前没有注册过,那么即使发生了IO事件,也 不会被选择器选中。SelectionKey和IO的关系可以简单地理解为 SelectionKey 就是被选中了的IO事件。通过 SelectionKey,我们不仅可以获得通道的IO事件类型,还可以获得发生IO事件所在的通道,甚至可以获得选择器实例。


选择器的使用步骤

第一步:获取选择器实例。

1
2
3
// 内部是向选择器SPI发出请求,通过默认的 SelectorProvider(选择器提供者)对象获取一个新的选择器实例
// Java通过SPI的方式提供选择器的默认实现版本。也就是说,我们可以通过SPI的方式提供定制化版本的选择器
Selector selector = Selector.open();

第二步:将通道注册到选择器实例。

1
2
3
4
5
6
7
8
9
10
11
// 获取通道
ServerSocketChannelserverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞(注意:注册到选择器的通道必须处于非阻塞模式下,否则 IllegalBlockingModeException)
serverSocketChannel.configureBlocking(false);
// 绑定连接
serverSocketChannel.bind(new InetSocketAddress(18899));
// 将通道注册到选择器上,并指定监听连接就绪事件。
//// 一个通道并不一定支持所有的四种IO事件。
//// 比如服务器监听通道 ServerSocketChannel 仅支持 Accept(接收到新连接)IO 事件。而 SocketChannel 不支持该事件。
//// 可以在注册之前通过通道的 validOps() 方法来获取该通道支持的所有IO事件集合。
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

第三步:选出感兴趣的IO就绪事件(选择键集合)。

1
Set selectedKeys = selector.selectedKeys();

第四步:迭代集合的每一个选择键,根据具体IO事件类型执行对 应的业务操作。用于选择就绪的IO事件的select()方法有多个重载的实现版本, 具体如下:

  • select():阻塞调用,直到至少有一个通道发生了注册的 IO 事件。
  • select(long timeout):和select()一样,但最长阻塞时间 为timeout 指定的毫秒数。
  • selectNow():非阻塞,不管有没有 IO 事件都会立刻返回。

select() 方法的返回值是整数类型(int),表示发生了IO事件的数量,即从上一次select到这一次select之间有多少通道发生了IO事 件,更加准确地说是发生了选择器感兴趣(注册过)的IO事件数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
while (selector.select() > 0) {
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 立即移除选择键,确保移除一定执行!
keyIterator.remove();
// 处理Key被取消的情况
if (!key.isValid()) {
continue;
}

try {
// 根据具体的IO事件类型执行对应的业务操作
if(key.isAcceptable()) {
handleAccept(key); // IO事件:ServerSocketChannel 服务器监听通道有新连接
} else if (key.isConnectable()) {
handleConn(key); // IO事件:传输通道连接成功
} else if (key.isReadable()) {
handleRead(key); // IO事件:传输通道可读
} else if (key.isWritable()) {
handleWrite(key); // IO事件:传输通道可写
}
} catch (CancelledKeyException e) {
// Key在处理过程中被取消,忽略
} catch (Exception e) {
// 处理业务异常
handleException(key, e);
// 重要:异常后取消key
if (key.isValid()) {
key.cancel();
try { key.channel().close(); } catch (IOException ignored) {}
}
}
}
}


Discard服务演示案例

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class NioDiscardServer {
public static void main(String[] args) throws IOException {
startServer();
}

// 复用缓冲区(线程不安全,但这里服务端采用的是单线程处理,所以没问题,每次用完 BUFFER_CACHE 也不必remove)
private static final ThreadLocal<ByteBuffer> BUFFER_CACHE =
ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(8192));

public static void startServer() throws IOException {
Selector selector = null;
ServerSocketChannel serverChannel = null;

try {
// 获取选择器
selector = Selector.open();
// 获取通道
serverChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverChannel.configureBlocking(false);
// 绑定连接
serverChannel.bind(new InetSocketAddress(9002));
// 将通道注册的接收新连接IO事件注册到选择器上
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO Discard 服务器启动在端口 9002");

// 轮询感兴趣的IO就绪事件(选择键集合)
while (!Thread.currentThread().isInterrupted()) {
// 设置超时,避免完全阻塞
int readyChannels = selector.select(1000);
if (readyChannels == 0) {
// 超时,可以执行一些后台任务
idleTask();
continue;
}

// 获取选择键集合
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
// 获取单个的选择键,并处理
SelectionKey selectedKey = keyIterator.next();
// 立即移除选择键
keyIterator.remove();
// 处理Key已被取消的情况
if (!selectedKey.isValid()) {
continue;
}
try {
if (selectedKey.isAcceptable()) {
handleAccept(selectedKey, selector);
} else if (selectedKey.isReadable()) {
handleRead(selectedKey);
} else if (selectedKey.isWritable()) {
handleWrite(selectedKey);
}
} catch (IOException e) {
System.err.println("处理事件失败: " + e.getMessage());
// 清理资源
cleanupKey(selectedKey);
} catch (CancelledKeyException e) {
// Key已被取消,忽略
}
}
}
} catch (IOException e) {
System.err.println("服务器异常: " + e.getMessage());
throw e;
} finally {
// 清理资源
if (selector != null && selector.isOpen()) {
for (SelectionKey key : selector.keys()) {
cleanupKey(key);
}
try { selector.close(); } catch (IOException ignored) {}
}
if (serverChannel != null && serverChannel.isOpen()) {
try { serverChannel.close(); } catch (IOException ignored) {}
}
}
}

private static void idleTask() {
// 服务器空闲时可以执行的任务
// 如:清理超时连接、统计信息等
// System.out.println("服务器空闲中...");
}

private static void cleanupKey(SelectionKey key) {
if (key != null) {
key.cancel(); // 从Selector中取消注册
SelectableChannel channel = key.channel();
if (channel != null && channel.isOpen()) {
try {
channel.close();
System.out.println("关闭连接: " + (channel instanceof SocketChannel ?
((SocketChannel) channel).getRemoteAddress() : ""));
} catch (IOException e) {
// 忽略关闭异常
}
}
}
}

private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = null;
try {
clientChannel = serverChannel.accept();
if (clientChannel == null) {
return; // 可能被其他线程处理了
}
clientChannel.configureBlocking(false);

// 注册读事件,可以附加附件对象
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("接受新连接: " + clientChannel.getRemoteAddress());
} catch (IOException e) {
System.err.println("接受连接失败: " + e.getMessage());
if (clientChannel != null) {
clientChannel.close();
}
throw e;
}
}

private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();

// 获取线程本地缓冲区
ByteBuffer buffer = BUFFER_CACHE.get();
buffer.clear(); // 重置缓冲区

// 防御编程
int bytesRead;
try {
bytesRead = channel.read(buffer);
} catch (IOException e) {
// 连接异常关闭
System.err.println("读取数据失败: " + e.getMessage());
cleanupKey(key);
return;
}
if (bytesRead == -1) {
// 客户端正常关闭
System.out.println("连接关闭: " + channel.getRemoteAddress());
cleanupKey(key);
return;
}
if (bytesRead == 0) {
// 没有数据可读,保持连接
return;
}

// 处理接收到的数据
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);

// 这里是Discard服务器,所以只打印,不回复
String message = new String(data, StandardCharsets.UTF_8);
System.out.printf("收到来自 %s 的数据[%d字节]: %s%n",
channel.getRemoteAddress(), bytesRead,
message.length() > 50 ? message.substring(0, 50) + "..." : message);

// 如果是quit命令,关闭连接
if (message.trim().equalsIgnoreCase("quit")) {
System.out.println("客户端请求退出: " + channel.getRemoteAddress());
cleanupKey(key);
return;
}

// 如果需要回复,可以注册写事件
// key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
// key.attach("需要回复的数据");
}

private static void handleWrite(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
Object attachment = key.attachment();

if (attachment instanceof String response) {
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
while (buffer.hasRemaining()) {
channel.write(buffer);
}
// 写完后取消写关注,避免繁忙循环
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
key.attach(null); // 清理附件
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class NioDiscardClient {
public static void main(String[] args) throws Exception {
testMultipleClients();
}

private static void testMultipleClients() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
int clientId = i;
executor.submit(() -> sendMessages(clientId));
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}

private static void sendMessages(int clientId) {
try (SocketChannel channel = SocketChannel.open()) {
channel.configureBlocking(true); // 客户端可以用阻塞模式
channel.connect(new InetSocketAddress("127.0.0.1", 9002));
if (channel.finishConnect()) {
System.out.printf("Client-%d 连接成功%n", clientId);
// 发送多条消息
for (int i = 0; i < 3; i++) {
String message = String.format("Hello from Client-%d, Message-%d", clientId, i);
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
while (buffer.hasRemaining()) {
channel.write(buffer);
}
System.out.printf("Client-%d 发送: %s%n", clientId, message);
Thread.sleep(1000);
}
// 发送退出命令
ByteBuffer quitBuffer = ByteBuffer.wrap("quit".getBytes());
channel.write(quitBuffer);
System.out.printf("Client-%d 发送退出命令%n", clientId);
}
} catch (IOException | InterruptedException e) {
System.err.printf("Client-%d 错误: %s%n", clientId, e.getMessage());
}
}
}

服务端日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
NIO Discard 服务器启动在端口 9002
接受新连接: /127.0.0.1:56128
接受新连接: /127.0.0.1:56130
接受新连接: /127.0.0.1:56129
接受新连接: /127.0.0.1:56131
接受新连接: /127.0.0.1:56132
收到来自 /127.0.0.1:56132 的数据[30字节]: Hello from Client-3, Message-0
收到来自 /127.0.0.1:56128 的数据[30字节]: Hello from Client-1, Message-0
收到来自 /127.0.0.1:56130 的数据[30字节]: Hello from Client-4, Message-0
收到来自 /127.0.0.1:56131 的数据[30字节]: Hello from Client-0, Message-0
收到来自 /127.0.0.1:56129 的数据[30字节]: Hello from Client-2, Message-0
收到来自 /127.0.0.1:56131 的数据[30字节]: Hello from Client-0, Message-1
收到来自 /127.0.0.1:56132 的数据[30字节]: Hello from Client-3, Message-1
收到来自 /127.0.0.1:56128 的数据[30字节]: Hello from Client-1, Message-1
收到来自 /127.0.0.1:56130 的数据[30字节]: Hello from Client-4, Message-1
收到来自 /127.0.0.1:56129 的数据[30字节]: Hello from Client-2, Message-1
收到来自 /127.0.0.1:56131 的数据[30字节]: Hello from Client-0, Message-2
收到来自 /127.0.0.1:56132 的数据[30字节]: Hello from Client-3, Message-2
收到来自 /127.0.0.1:56128 的数据[30字节]: Hello from Client-1, Message-2
收到来自 /127.0.0.1:56130 的数据[30字节]: Hello from Client-4, Message-2
收到来自 /127.0.0.1:56129 的数据[30字节]: Hello from Client-2, Message-2
收到来自 /127.0.0.1:56132 的数据[4字节]: quit
客户端请求退出: /127.0.0.1:56132
收到来自 /127.0.0.1:56131 的数据[4字节]: quit
客户端请求退出: /127.0.0.1:56131
收到来自 /127.0.0.1:56128 的数据[4字节]: quit
客户端请求退出: /127.0.0.1:56128
收到来自 /127.0.0.1:56130 的数据[4字节]: quit
客户端请求退出: /127.0.0.1:56130
收到来自 /127.0.0.1:56129 的数据[4字节]: quit
客户端请求退出: /127.0.0.1:56129


字符集与编解码

NIO支持的字符集

1
2
3
4
5
6
// NIO支持的字符集
@Test
public void test04() {
SortedMap<String,Charset> charsets = Charset.availableCharsets();
charsets.forEach((k,v) -> System.out.println("k=" + k + ", v=" + v));
}

常见的有GBK、UTF-8、UTF-16、ASCII、ISO-8859-1等。


字符编码的使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void test05() throws IOException {
Charset charset = Charset.forName("GBK");
CharBuffer charBuffer = CharBuffer.allocate(1024);
charBuffer.put("你好,keyllo");
// 编码得到byteBuffer
CharsetEncoder encoder = charset.newEncoder();
charBuffer.flip();
ByteBuffer byteBuffer = encoder.encode(charBuffer);
for (int i = 0; i < byteBuffer.limit(); i++) { // limit=12
System.out.println(byteBuffer.get());
}
// 解码得到charBuffer
CharsetDecoder decoder = charset.newDecoder();
byteBuffer.flip();
CharBuffer charBuffer2 = decoder.decode(byteBuffer);
System.out.println(charBuffer2.toString()); // 你好,keyllo
}